{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Titanic Dataset for the Feature Store"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook prepares the Titanic dataset to be used with the feature store.\n",
    "\n",
    "The Titanic dataset contains information about the passengers of the famous Titanic ship. The training and test data come in form of two CSV files, which can be downloaded from the Titanic Competition page on [Kaggle](https://www.kaggle.com/c/titanic/data).\n",
    "\n",
    "Download the `train.csv` and `test.csv` files, and upload them to the `Resources` folder of your Hopsworks Project. If you prefer doing things using GUIs, then you can find the `Resources` by opening the **Data Sets** tab on the left menu bar.\n",
    "\n",
    "Once you have the two files uploaded on the `Resources` folder, you can proceed with the rest of the notebook."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>5</td><td>application_1568986312014_0006</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8088/proxy/application_1568986312014_0006/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8042/node/containerlogs/container_e01_1568986312014_0006_01_000001/maggy_ablation__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "import tensorflow as tf\n",
    "from hops import hdfs\n",
    "from hops import featurestore\n",
    "from pyspark.sql import functions as F"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's begin by reading the training data into a Spark DataFrame:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# read the training data csv into a Spark DataFrame\n",
    "\n",
    "training_csv_path = hdfs.abs_path('') + 'Resources/' + 'titanic-train.csv'\n",
    "raw_df = spark.read.csv(training_csv_path, header=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, we can do some simple preprocessing. Rather than registering the whole dataset with the Feature Store, we just select a few of the columns, and cast all columns to `int`. Since the values of the `sex` column are either `male` or `female`, we also convert them to `0` or `1`, respectively. We also fill the missing values of the `age` column with `30`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# simple preprocessing:\n",
    "#     1 - selecting a few of the columns\n",
    "#     2 - Filling the missing 'age' values with 30\n",
    "#     3 - changing sex to 0 or 1\n",
    "#     4 - casting all columns to int\n",
    "\n",
    "clean_train_df = raw_df.select('survived', 'pclass', 'sex', 'fare', 'age', 'sibsp', 'parch') \\\n",
    "                    .fillna({'age': 30}) \\\n",
    "                    .withColumn('sex',\n",
    "                        F.when(F.col('sex')=='male', 0)\n",
    "                        .otherwise(1))\\\n",
    "                    .withColumn('survived',\n",
    "                               F.col('survived').cast('int')) \\\n",
    "                    .withColumn('pclass',\n",
    "                               F.col('pclass').cast('int')) \\\n",
    "                    .withColumn('fare',\n",
    "                                F.col('fare').cast('int')) \\\n",
    "                    .withColumn('age',\n",
    "                               F.col('age').cast('int')) \\\n",
    "                    .withColumn('sibsp',\n",
    "                               F.col('sibsp').cast('int')) \\\n",
    "                    .withColumn('parch',\n",
    "                               F.col('parch').cast('int'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's see how our \"clean\" dataframe looks like now:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+------+---+----+---+-----+-----+\n",
      "|survived|pclass|sex|fare|age|sibsp|parch|\n",
      "+--------+------+---+----+---+-----+-----+\n",
      "|       0|     3|  0|   7| 22|    1|    0|\n",
      "|       1|     1|  1|  71| 38|    1|    0|\n",
      "|       1|     3|  1|   7| 26|    0|    0|\n",
      "|       1|     1|  1|  53| 35|    1|    0|\n",
      "|       0|     3|  0|   8| 35|    0|    0|\n",
      "|       0|     3|  0|   8| 30|    0|    0|\n",
      "|       0|     1|  0|  51| 54|    0|    0|\n",
      "|       0|     3|  0|  21|  2|    3|    1|\n",
      "|       1|     3|  1|  11| 27|    0|    2|\n",
      "|       1|     2|  1|  30| 14|    1|    0|\n",
      "|       1|     3|  1|  16|  4|    1|    1|\n",
      "|       1|     1|  1|  26| 58|    0|    0|\n",
      "|       0|     3|  0|   8| 20|    0|    0|\n",
      "|       0|     3|  0|  31| 39|    1|    5|\n",
      "|       0|     3|  1|   7| 14|    0|    0|\n",
      "|       1|     2|  1|  16| 55|    0|    0|\n",
      "|       0|     3|  0|  29|  2|    4|    1|\n",
      "|       1|     2|  0|  13| 30|    0|    0|\n",
      "|       0|     3|  1|  18| 31|    1|    0|\n",
      "|       1|     3|  1|   7| 30|    0|    0|\n",
      "+--------+------+---+----+---+-----+-----+\n",
      "only showing top 20 rows"
     ]
    }
   ],
   "source": [
    "clean_train_df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The next step would be to create a *feature group* from our clean dataframe, so as to register it with the Project's Feature Store:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "computing descriptive statistics for : titanic_training_all_features, version: 1\n",
      "Running sql: use maggy_ablation_featurestore\n",
      "Feature group created successfully"
     ]
    }
   ],
   "source": [
    "# create a feature group from the training data DataFrame\n",
    "\n",
    "featurestore.create_featuregroup(clean_train_df,\n",
    "                                 'titanic_training_all_features',\n",
    "                                 description='titanic training dataset with clean features',\n",
    "                                 descriptive_statistics=True,\n",
    "                                 feature_correlation=False,\n",
    "                                 feature_histograms=False,\n",
    "                                 cluster_analysis=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, we can forget about our previous \"clean\" dataframe that we read directly from the CSV file, and retrieve the training dataframe from the feature store:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use maggy_ablation_featurestore\n",
      "SQL string for the query created successfully\n",
      "Running sql: SELECT * FROM titanic_training_all_features_1"
     ]
    }
   ],
   "source": [
    "# retrieve dataframe from feature store\n",
    "\n",
    "titanic_df = featurestore.get_featuregroup('titanic_training_all_features')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------+------+---+----+---+-----+-----+\n",
      "|survived|pclass|sex|fare|age|sibsp|parch|\n",
      "+--------+------+---+----+---+-----+-----+\n",
      "|       0|     3|  0|   7| 22|    1|    0|\n",
      "|       1|     1|  1|  71| 38|    1|    0|\n",
      "|       1|     3|  1|   7| 26|    0|    0|\n",
      "|       1|     1|  1|  53| 35|    1|    0|\n",
      "|       0|     3|  0|   8| 35|    0|    0|\n",
      "|       0|     3|  0|   8| 30|    0|    0|\n",
      "|       0|     1|  0|  51| 54|    0|    0|\n",
      "|       0|     3|  0|  21|  2|    3|    1|\n",
      "|       1|     3|  1|  11| 27|    0|    2|\n",
      "|       1|     2|  1|  30| 14|    1|    0|\n",
      "|       1|     3|  1|  16|  4|    1|    1|\n",
      "|       1|     1|  1|  26| 58|    0|    0|\n",
      "|       0|     3|  0|   8| 20|    0|    0|\n",
      "|       0|     3|  0|  31| 39|    1|    5|\n",
      "|       0|     3|  1|   7| 14|    0|    0|\n",
      "|       1|     2|  1|  16| 55|    0|    0|\n",
      "|       0|     3|  0|  29|  2|    4|    1|\n",
      "|       1|     2|  0|  13| 30|    0|    0|\n",
      "|       0|     3|  1|  18| 31|    1|    0|\n",
      "|       1|     3|  1|   7| 30|    0|    0|\n",
      "+--------+------+---+----+---+-----+-----+\n",
      "only showing top 20 rows"
     ]
    }
   ],
   "source": [
    "titanic_df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Finally, we create a *training dataset* from the feature group. This is a very simple task using the Feature Store API. You can provide a name, and the data format for the dataset. For now, let's stick with `tfrecord`, TensorFlow's own file format."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "write feature frame, write_mode: overwrite\n",
      "Training Dataset created successfully"
     ]
    }
   ],
   "source": [
    "featurestore.create_training_dataset(titanic_df,\n",
    "                                    \"titanic_train_dataset\",\n",
    "                                    data_format='tfrecords',\n",
    "                                    descriptive_statistics=False,\n",
    "                                    feature_correlation=False,\n",
    "                                    feature_histograms=False,\n",
    "                                    cluster_analysis=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Done! you can now use the titanic training data in your Projects!"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
